package com.atguigu.flink.chapter11.function;

import com.atguigu.flink.bean.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;

import java.util.Locale;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class Flink01_Scala {
       public static void main(String[] args) {
               Configuration configuration = new Configuration();
               //web  UI端口
               configuration.setInteger("rest.prot",10000);
               StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
               env.setParallelism(2);

           DataStreamSource<WaterSensor> waterSensorStream =
                   env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
                           new WaterSensor("sensor_1", 2000L, 20),
                           new WaterSensor("sensor_2", 3000L, 30),
                           new WaterSensor("sensor_1", 4000L, 40),
                           new WaterSensor("sensor_1", 5000L, 50),
                           new WaterSensor("sensor_2", 6000L, 60));

           // 1. 创建表的执行环境
           StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
           Table table = tableEnv.fromDataStream(waterSensorStream);
           tableEnv.createTemporaryView("sensor",table);

           //1.在table API 中使用
           // 1.1 内联的方式使用
          /* table
                   .select($("id"),call(MyToUpperCase.class,$("id")))
                   .execute()
                   .print();
*/

           // 1.2 函数先注册，再使用
           /* tableEnv.createTemporaryFunction("my_to_uppercase",MyToUpperCase.class);
            // 可以注册函数后，填写函数名
           table
                   .select($("id"),call("my_to_uppercase",$("id")))
                   .execute()
                   .print();
*/

           // 2.在sql 中使用
           tableEnv.createTemporaryFunction("my_to_uppercase",MyToUpperCase.class);

           tableEnv.sqlQuery("select id,my_to_uppercase(id) from sensor").execute().print();







           }

           public static class MyToUpperCase extends ScalarFunction {
                // 方法名和修饰符必须这样写，其他根据具体需求
           public String eval(String s){
               if (s!=null) {
                   return s.toUpperCase( );
               }
               return null;
           }


           }

}
